{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright (c) MONAI Consortium  \n",
    "Licensed under the Apache License, Version 2.0 (the \"License\");  \n",
    "you may not use this file except in compliance with the License.  \n",
    "You may obtain a copy of the License at  \n",
    "&nbsp;&nbsp;&nbsp;&nbsp;http://www.apache.org/licenses/LICENSE-2.0  \n",
    "Unless required by applicable law or agreed to in writing, software  \n",
    "distributed under the License is distributed on an \"AS IS\" BASIS,  \n",
    "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  \n",
    "See the License for the specific language governing permissions and  \n",
    "limitations under the License."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Deploying a MedNIST Classifier with Ray\n",
    "\n",
    "This notebook demos the process of deploying a network with Ray Serve as a web service. Ray provides various ways of deploying models with existing platforms like AWS or Azure but we'll focus on local deployment here since researchers are more likely to do this. Ray also provides other libraries for tuning, reinforcement learning, and distributed training in addition to deployment. This tutorial will use MedNIST classifier from the BentoML tutorial so please run at least the training component of that notebook first. The documentation on Ray Serve [start here](https://docs.ray.io/en/master/serve/index.html#rayserve), this notebook will be using the Pytorch specific functionality [discussed here](https://docs.ray.io/en/master/serve/tutorials/pytorch.html)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup environment\n",
    "To start install the Ray Serve component:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "!pip install \"ray[serve]\" \"protobuf<4\"\n",
    "!python -c \"import monai\" || pip install -q \"monai-weekly[gdown, tqdm]\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup imports\n",
    "The imports for MONAI are the same as for the BentoML tutorial (assuming it's already installed):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "MONAI version: 1.1.0+11.g7de6c336.dirty\n",
      "Numpy version: 1.22.2\n",
      "Pytorch version: 1.13.0+cu117\n",
      "MONAI flags: HAS_EXT = False, USE_COMPILED = False, USE_META_DICT = False\n",
      "MONAI rev id: 7de6c33656a99087ca3b89a817b0879cf093febc\n",
      "MONAI __file__: /workspace/Code/MONAI/monai/__init__.py\n",
      "\n",
      "Optional dependencies:\n",
      "Pytorch Ignite version: 0.4.10\n",
      "Nibabel version: 4.0.2\n",
      "scikit-image version: 0.19.3\n",
      "Pillow version: 9.0.1\n",
      "Tensorboard version: 2.11.0\n",
      "gdown version: 4.6.0\n",
      "TorchVision version: 0.14.0+cu117\n",
      "tqdm version: 4.64.1\n",
      "lmdb version: 1.3.0\n",
      "psutil version: 5.9.2\n",
      "pandas version: 1.1.5\n",
      "einops version: 0.6.0\n",
      "transformers version: 4.21.3\n",
      "mlflow version: 2.0.1\n",
      "pynrrd version: 1.0.0\n",
      "\n",
      "For details about installing the optional dependencies, please visit:\n",
      "    https://docs.monai.io/en/latest/installation.html#installing-the-recommended-dependencies\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "import io\n",
    "from PIL import Image\n",
    "import torch\n",
    "import numpy as np\n",
    "import requests\n",
    "\n",
    "import ray\n",
    "from ray import serve\n",
    "\n",
    "from monai.apps import download_url\n",
    "from monai.config import print_config\n",
    "from monai.transforms import (\n",
    "    EnsureChannelFirst,\n",
    "    Compose,\n",
    "    ScaleIntensity,\n",
    "    EnsureType,\n",
    ")\n",
    "\n",
    "\n",
    "print_config()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "resource = \"https://drive.google.com/uc?id=1zKRi5FrwEES_J-AUkM7iBJwc__jy6ct6\"\n",
    "dst = os.path.join(\"..\", \"bentoml\", \"classifier.zip\")\n",
    "if not os.path.exists(dst):\n",
    "    download_url(resource, dst)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This class will represent the service for the model, which accepts an image sent as the body of a POST request and returns the class name in a JSON structure. Note that this class uses asyncio to define the `__call__` to be compatible with the server backend."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "MEDNIST_CLASSES = [\"AbdomenCT\", \"BreastMRI\", \"CXR\", \"ChestCT\", \"Hand\", \"HeadCT\"]\n",
    "\n",
    "\n",
    "@serve.deployment\n",
    "class MedNISTClassifier:\n",
    "    def __init__(self):\n",
    "        # create the transform for normalizing the image data\n",
    "        self.transform = Compose([EnsureChannelFirst(channel_dim=\"no_channel\"), ScaleIntensity(), EnsureType()])\n",
    "        # load the network on the CPU for simplicity and in eval mode\n",
    "        self.net = torch.jit.load(\"../bentoml/classifier.zip\", map_location=\"cpu\").eval()\n",
    "\n",
    "    async def __call__(self, request):\n",
    "        image_bytes = await request.body()\n",
    "        img = Image.open(io.BytesIO(image_bytes))\n",
    "        img = np.array(img)\n",
    "        image_tensor = self.transform(img)\n",
    "\n",
    "        with torch.no_grad():\n",
    "            outputs = self.net(image_tensor[None].float())\n",
    "\n",
    "        _, output_classes = outputs.max(dim=1)\n",
    "\n",
    "        return {\"class_index\": MEDNIST_CLASSES[output_classes[0]]}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now the server is started and the classifier backend is associated with an endpoint, which is the route to the service relate to the server address."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2021-10-08 14:49:07,551\tINFO services.py:1250 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265\u001b[39m\u001b[22m\n",
      "\u001b[2m\u001b[36m(pid=2240518)\u001b[0m 2021-10-08 14:49:08,881\tINFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.\n",
      "\u001b[2m\u001b[36m(pid=2240518)\u001b[0m 2021-10-08 14:49:08,886\tINFO http_state.py:75 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:ZAewXP:SERVE_PROXY_ACTOR-node:10.246.179.34-0' on node 'node:10.246.179.34-0' listening on '127.0.0.1:8000'\n",
      "2021-10-08 14:49:09,168\tINFO api.py:455 -- Started Serve instance in namespace 'serve'.\n",
      "2021-10-08 14:49:09,185\tINFO api.py:243 -- Updating deployment 'MedNISTClassifier'. component=serve deployment=MedNISTClassifier\n",
      "\u001b[2m\u001b[36m(pid=2240517)\u001b[0m INFO:     Started server process [2240517]\n",
      "\u001b[2m\u001b[36m(pid=2240518)\u001b[0m 2021-10-08 14:49:09,214\tINFO backend_state.py:896 -- Adding 1 replicas to deployment 'MedNISTClassifier'. component=serve deployment=MedNISTClassifier\n",
      "2021-10-08 14:49:11,727\tINFO api.py:250 -- Deployment 'MedNISTClassifier' is ready at `http://127.0.0.1:8000/MedNISTClassifier`. component=serve deployment=MedNISTClassifier\n"
     ]
    }
   ],
   "source": [
    "serve.start()\n",
    "MedNISTClassifier.deploy()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With the server running in another process we can send it a query with an image and get a response. By default the server will listen on port 8000."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'class_index': 'Hand'}\n"
     ]
    }
   ],
   "source": [
    "image_bytes = open(\"./hand.jpg\", \"rb\").read()\n",
    "\n",
    "resp = requests.post(\"http://localhost:8000/MedNISTClassifier\", data=image_bytes)\n",
    "print(resp.json())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This can also be done on the command line with `curl`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\n",
      "  \"class_index\": \"Hand\"\n",
      "}"
     ]
    }
   ],
   "source": [
    "!curl -X POST \"http://localhost:8000/MedNISTClassifier\" --data-binary \"@hand.jpg\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally shut down the server:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.shutdown()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Command Line Usage\n",
    "\n",
    "Ray can be started on the command line. Since it operates as a cluster of nodes the first thing to do is create the head node locally then start the serve component:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2021-10-08 14:51:40,274\tINFO scripts.py:606 -- Local node IP: 10.246.179.34\n",
      "2021-10-08 14:51:41,677\tSUCC scripts.py:645 -- --------------------\n",
      "2021-10-08 14:51:41,677\tSUCC scripts.py:646 -- Ray runtime started.\n",
      "2021-10-08 14:51:41,677\tSUCC scripts.py:647 -- --------------------\n",
      "2021-10-08 14:51:41,677\tINFO scripts.py:649 -- Next steps\n",
      "2021-10-08 14:51:41,677\tINFO scripts.py:650 -- To connect to this Ray runtime from another node, run\n",
      "2021-10-08 14:51:41,677\tINFO scripts.py:654 --   ray start --address='10.246.179.34:6379' --redis-password='5241590000000000'\n",
      "2021-10-08 14:51:41,677\tINFO scripts.py:659 -- Alternatively, use the following Python code:\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:662 -- import ray\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:663 -- ray.init(address='auto', _redis_password='5241590000000000')\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:671 -- To connect to this Ray runtime from outside of the cluster, for example to\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:673 -- connect to a remote cluster from your laptop directly, use the following\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:675 -- Python code:\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:678 -- import ray\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:679 -- ray.init(address='ray://<head_node_ip_address>:10001')\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:685 -- If connection fails, check your firewall settings and network configuration.\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:690 -- To terminate the Ray runtime, run\n",
      "2021-10-08 14:51:41,678\tINFO scripts.py:691 --   ray stop\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2021-10-08 14:51:41,553\tINFO services.py:1250 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265\u001b[39m\u001b[22m\n",
      "2021-10-08 14:51:42,287\tINFO worker.py:826 -- Connecting to existing Ray cluster at address: 10.246.179.34:6379\n",
      "\u001b[2m\u001b[36m(pid=2246270)\u001b[0m 2021-10-08 14:51:43,070\tINFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.\n",
      "\u001b[2m\u001b[36m(pid=2246270)\u001b[0m 2021-10-08 14:51:43,102\tINFO http_state.py:75 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:SERVE_PROXY_ACTOR-node:10.246.179.34-0' on node 'node:10.246.179.34-0' listening on '127.0.0.1:8000'\n",
      "2021-10-08 14:51:43,781\tINFO api.py:455 -- Started detached Serve instance in namespace 'serve'.\n",
      "\u001b[2m\u001b[36m(pid=2246314)\u001b[0m INFO:     Started server process [2246314]\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "\n",
    "ray start --head\n",
    "serve start"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A separate script with very similar code can then be used to add or replace the backend. This would be useful in an experimental setting where the server is running constantly in the background to which you can push updates quickly as you edit your script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting mednist_classifier_start.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile mednist_classifier_start.py\n",
    "\n",
    "import io\n",
    "from PIL import Image\n",
    "import torch\n",
    "import numpy as np\n",
    "\n",
    "import ray\n",
    "from ray import serve\n",
    "\n",
    "from monai.config import print_config\n",
    "from monai.transforms import (\n",
    "    EnsureChannelFirst,\n",
    "    Compose,\n",
    "    ScaleIntensity,\n",
    "    EnsureType,\n",
    ")\n",
    "\n",
    "MEDNIST_CLASSES = [\"AbdomenCT\", \"BreastMRI\", \"CXR\", \"ChestCT\", \"Hand\", \"HeadCT\"]\n",
    "\n",
    "ray.init(address=\"auto\", namespace=\"serve\")\n",
    "\n",
    "\n",
    "@serve.deployment\n",
    "class MedNISTClassifier:\n",
    "    def __init__(self):\n",
    "        self.transform = Compose([EnsureChannelFirst(channel_dim=\"no_channel\"), ScaleIntensity(), EnsureType()])\n",
    "        self.net = torch.jit.load(\"../bentoml/classifier.zip\", map_location=\"cpu\").eval()\n",
    "\n",
    "    async def __call__(self, request):\n",
    "        image_bytes = await request.body()\n",
    "        img = Image.open(io.BytesIO(image_bytes))\n",
    "        img = np.array(img)\n",
    "        image_tensor = self.transform(img)\n",
    "\n",
    "        with torch.no_grad():\n",
    "            outputs = self.net(image_tensor[None].float())\n",
    "\n",
    "        _, output_classes = outputs.max(dim=1)\n",
    "\n",
    "        return {\"class_index\": MEDNIST_CLASSES[output_classes[0]]}\n",
    "\n",
    "\n",
    "MedNISTClassifier.deploy()\n",
    "\n",
    "# ray.init(address=\"auto\")\n",
    "# client = serve.connect()\n",
    "\n",
    "# # remove previous instance of this backend if present\n",
    "# if \"classifier\" in client.list_backends():\n",
    "#     client.delete_endpoint(\"classifier\")\n",
    "#     client.delete_backend(\"classifier\")\n",
    "\n",
    "# client.create_backend(\"classifier\", MedNISTClassifier)\n",
    "# client.create_endpoint(\"classifier\", backend=\"classifier\", route=\"/image_classify\", methods=[\"POST\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The endpoint is then added by running the script:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2021-10-08 14:52:18,337\tINFO worker.py:826 -- Connecting to existing Ray cluster at address: 10.246.179.34:6379\n",
      "2021-10-08 14:52:18,493\tINFO api.py:243 -- Updating deployment 'MedNISTClassifier'. component=serve deployment=MedNISTClassifier\n",
      "\u001b[2m\u001b[36m(pid=2246270)\u001b[0m 2021-10-08 14:52:18,533\tINFO backend_state.py:896 -- Adding 1 replicas to deployment 'MedNISTClassifier'. component=serve deployment=MedNISTClassifier\n",
      "2021-10-08 14:52:21,277\tINFO api.py:250 -- Deployment 'MedNISTClassifier' is ready at `http://127.0.0.1:8000/MedNISTClassifier`. component=serve deployment=MedNISTClassifier\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "\n",
    "python mednist_classifier_start.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And checked once again for response:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\n",
      "  \"class_index\": \"Hand\"\n",
      "}"
     ]
    }
   ],
   "source": [
    "!curl -X POST \"http://localhost:8000/MedNISTClassifier\" --data-binary \"@hand.jpg\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally the service can be stopped:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2021-10-08 14:52:33,709\tINFO scripts.py:861 -- Did not find any active Ray processes.\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "\n",
    "ray stop"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
